package com.tygy.testweibo;

import org.apache.hadoop.hbase.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;

/**
 * @author lijiachenggong
 * @version 1.0
 */
public class WeiBoUtil {

    // 获取配置文件
    private static Configuration configuration = HBaseConfiguration.create();

    static {
        configuration.set("hbase.zookeeper.quorum", "192.168.100.101");
    }

    // 创建命名空间
    public static void createNameSpace(String ns) throws IOException {

        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        NamespaceDescriptor descriptor = NamespaceDescriptor.create(ns).build();
        admin.createNamespace(descriptor);
        admin.close();
        connection.close();

    }

    // 创建内容表，用户关系表，最新微博收件表
    public static void createTable(String tableName, int versions, String... cfs) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
        for (String cf : cfs) {
            HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cf);
            hColumnDescriptor.setMaxVersions(versions);
            hTableDescriptor.addFamily(hColumnDescriptor);
        }

        admin.createTable(hTableDescriptor);
    }

    // 发布微博
    public static void createData(String uid, String content) throws IOException {
        // 用户在微博内容发布微博（微博内容表）
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT));
        Table relaTable = connection.getTable(TableName.valueOf(Constant.RELATIONS));
        Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX));

        long ts = System.currentTimeMillis();
        String rowKey = uid + "_" + ts;
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
        contTable.put(put);

        // 用户和粉丝的关系（用户关系表）
        Get get = new Get(Bytes.toBytes(uid));
        get.addFamily(Bytes.toBytes("fans"));
        Result result = relaTable.get(get);

        Cell[] cells = result.rawCells();
        if (cells.length <= 0) {
            return;
        }

        // 粉丝更新用户微博（最新微博收件表）
        ArrayList<Put> arrayList = new ArrayList<>();
        for (Cell cell : cells) {
            byte[] cloneQualifier = CellUtil.cloneQualifier(cell);
            Put inboxPut = new Put(cloneQualifier);
            inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), Bytes.toBytes(rowKey));
            arrayList.add(inboxPut);
        }
        inboxTable.put(arrayList);

        // 关闭资源
        inboxTable.close();
        relaTable.close();
        contTable.close();

        System.out.println("发布成功！");

    }

    // 关注用户
    public static void addAttend(String uid, String... uids) throws IOException {
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table contable = connection.getTable(TableName.valueOf(Constant.CONTENT));
        Table relatable = connection.getTable(TableName.valueOf(Constant.RELATIONS));
        Table inboxtable = connection.getTable(TableName.valueOf(Constant.INBOX));

        // put集合
        ArrayList<Put> puts = new ArrayList<>();
        //添加关注用户的put
        Put relaput = new Put(Bytes.toBytes(uid));

        for (String id : uids) {
            //增加a的关注列表
            relaput.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(id), Bytes.toBytes(id));
            //创建被关注的人的Put
            Put fansPut = new Put(Bytes.toBytes(id));
            //被关注的人在fans列族下增加关注者
            fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
            //在大put里添加一群小put
            puts.add(fansPut);
        }

        puts.add(relaput);
        relatable.put(puts);

        //2 微博内容表

        //创建收件箱表的put对象
        Put inboxput = new Put(Bytes.toBytes(uid));
        //去内容表中获取用户对应关注者的数据
        for (String id : uids) {
            //获得单个用户所有的内容信息，通过 id+“|”来实现 阿斯克码
            Scan scan = new Scan(Bytes.toBytes(id), Bytes.toBytes(id + "|"));
            ResultScanner results = contable.getScanner(scan);
            //得到内容表的rowkey
            for (Result result : results) {
                String rowkey = Bytes.toString(result.getRow());
                String[] split = rowkey.split("_");
                byte[] row = result.getRow();
                //遍历rowkey，放入inbox的普通对象中
                //inboxput.addColumn(Bytes.toBytes("info"),Bytes.toBytes(id),rowkey);
                inboxput.addColumn(Bytes.toBytes("info"), Bytes.toBytes(id), Long.parseLong(split[1]), row);
            }
        }
        inboxtable.put(inboxput);

        //关闭资源
        contable.close();
        relatable.close();
        inboxtable.close();
        connection.close();

        System.out.println("关注成功！");

    }

    // 取关用户
    public static  void  delAttends(String uid,String ...uids)throws IOException{
        Connection connection = ConnectionFactory.createConnection(configuration);

        Table relatable = connection.getTable(TableName.valueOf(Constant.RELATIONS));
        Table inboxtable= connection.getTable(TableName.valueOf(Constant.INBOX));

        Delete reladel = new Delete(Bytes.toBytes(uid));
        ArrayList<Delete> deletes = new ArrayList<>();

        for(String s :uids){
            Delete fansDel = new Delete(Bytes.toBytes(s));
            fansDel.addColumn(Bytes.toBytes("fans"),Bytes.toBytes(uid));
            reladel.addColumn(Bytes.toBytes("attends"),Bytes.toBytes(s));
            deletes.add(fansDel);

        }
        deletes.add(reladel);
        relatable.delete(deletes);
        Delete inboxdel = new Delete(Bytes.toBytes(uid));

        for(String id :uids){
            inboxdel.addColumn(Bytes.toBytes("info"),Bytes.toBytes(id));

        }
        inboxtable.delete(inboxdel);

        relatable.close();
        inboxtable.close();
        connection.close();

        System.out.println("取关成功！");

    }

    // 推送最新微博
    public static void getData(String uid) throws IOException {
        //获取连接
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表对象（最新微博收件箱表、微博内容表）
        Table inboxTable = connection.getTable(TableName.valueOf(Constant.INBOX));
        Table contTable = connection.getTable(TableName.valueOf(Constant.CONTENT));
        //操作最新微博收件箱表，获取对应内容的rowkey
        Get get = new Get(Bytes.toBytes(uid));
        get.setMaxVersions();
        Result result = inboxTable.get(get);
        ArrayList<Get> gets = new ArrayList<>();
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            Get contGet = new Get(CellUtil.cloneValue(cell));
            gets.add(contGet);
        }
        Result[] results = contTable.get(gets);
        //在微博内容表中查询对应微博内容
        for (Result result2 : results) {
            Cell[] cells1 = result2.rawCells();
            for (Cell cell : cells1) {
                //遍历显示微博内容
                System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(cell))+
                        "content:"+Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        //关闭资源
        inboxTable.close();
        contTable.close();
        connection.close();

        System.out.println("推送成功！");

    }

    // 查看某人微博内容
    public static void getUserData(String uid) throws IOException {
        //获取连接
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取表对象
        Table table = connection.getTable(TableName.valueOf(Constant.CONTENT));
        //遍历某人的微博内容
        Scan scan = new Scan();
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(uid+"_"));
        scan.setFilter(rowFilter);
        ResultScanner results = table.getScanner(scan);
        for (Result result : results) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(cell))+
                        "content:"+Bytes.toString(CellUtil.cloneValue(cell)));
            }
        }
        //关闭资源
        table.close();
        connection.close();

        System.out.println("查看成功！");

    }
}
